package com.luo.d3s.ext.event.jdbc.repository;

import com.luo.d3s.core.domain.event.DomainEvent;
import com.luo.d3s.core.domain.event.DomainEventRepository;
import com.luo.d3s.core.domain.event.DomainEventStatus;
import com.luo.d3s.ext.event.jdbc.convertor.DomainEventConvertor;
import com.luo.d3s.ext.event.jdbc.dataobject.DomainEventDo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jdbc.core.BeanPropertyRowMapper;
import org.springframework.jdbc.core.namedparam.BeanPropertySqlParameterSource;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
import org.springframework.jdbc.core.namedparam.SqlParameterSource;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import java.time.LocalDateTime;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;

/**
 * 领域事件存储实现
 *
 * @author luohq
 * @date 2022-12-22 13:30
 */
@Component
public class DomainEventRepositoryImpl implements DomainEventRepository {

    private static final Logger log = LoggerFactory.getLogger(DomainEventRepositoryImpl.class);
    private final String SQL_SAVE = "insert into domain_event(id, aggregate_id, json_type, json_content, status, create_time) "
            + "values(:id, :aggregateId, :jsonType, :jsonContent, :status, :createTime)";

    private final String SQL_UPDATE_STATUS = "update domain_event set status = :status , update_time = :updateTime where id = :id";
    private final String SQL_FIND_TO_PUBLISH = "select id, aggregate_id, json_type, json_content, status, create_time "
            + "from domain_event "
            + "where status = :status "
            + "order by create_time asc";

    private final String SQL_FIND_TO_PUBLISH_BEFORE_DATETIME = "select id, aggregate_id, json_type, json_content, status, create_time "
            + "from domain_event "
            + "where status = :status and create_time <= :createTime "
            + "order by create_time asc";

    private final String SQL_DELETE_BEFORE_DATETIME = "delete from domain_event where create_time <= ? and status = ?";

    private NamedParameterJdbcTemplate npJdbcTemplate;

    public DomainEventRepositoryImpl(NamedParameterJdbcTemplate npJdbcTemplate) {
        this.npJdbcTemplate = npJdbcTemplate;
    }

    @Override
    public void save(DomainEvent event) {
        DomainEventDo domainEventDo = DomainEventConvertor.toEventDo(event);
        this.npJdbcTemplate.update(this.SQL_SAVE, new BeanPropertySqlParameterSource(domainEventDo));
    }

    @Override
    public List<DomainEvent> toPublish() {
        DomainEventDo domainEventDo = new DomainEventDo();
        domainEventDo.setStatus(DomainEventStatus.CREATED.getStatus());
        List<DomainEventDo> domainEventDoList = this.npJdbcTemplate.query(this.SQL_FIND_TO_PUBLISH,
                new BeanPropertySqlParameterSource(domainEventDo),
                new BeanPropertyRowMapper(DomainEventDo.class));

        if (CollectionUtils.isEmpty(domainEventDoList)) {
            return null;
        }
        return domainEventDoList.stream()
                .map(DomainEventConvertor::toEvent)
                .map(DomainEvent.class::cast)
                .collect(Collectors.toList());
    }

    @Override
    public List<DomainEvent> toPublish(LocalDateTime lastCreateTime) {
        DomainEventDo domainEventDo = new DomainEventDo();
        domainEventDo.setStatus(DomainEventStatus.CREATED.getStatus());
        domainEventDo.setCreateTime(lastCreateTime);
        List<DomainEventDo> domainEventDoList = this.npJdbcTemplate.query(this.SQL_FIND_TO_PUBLISH_BEFORE_DATETIME,
                new BeanPropertySqlParameterSource(domainEventDo),
                new BeanPropertyRowMapper(DomainEventDo.class));

        if (CollectionUtils.isEmpty(domainEventDoList)) {
            return null;
        }
        return domainEventDoList.stream()
                .map(DomainEventConvertor::toEvent)
                .map(DomainEvent.class::cast)
                .collect(Collectors.toList());
    }

    @Override
    public void published(List<DomainEvent> events) {
        if (CollectionUtils.isEmpty(events)) {
            return;
        }
        SqlParameterSource[] sqlParameterSources = events.stream()
                .map(event -> {
                    DomainEventDo domainEventDo = new DomainEventDo();
                    domainEventDo.setId(String.valueOf(event.getEventId()));
                    domainEventDo.setStatus(DomainEventStatus.PUBLISHED.getStatus());
                    domainEventDo.setUpdateTime(LocalDateTime.now());
                    return new BeanPropertySqlParameterSource(domainEventDo);
                }).toArray(SqlParameterSource[]::new);
        this.npJdbcTemplate.batchUpdate(this.SQL_UPDATE_STATUS, sqlParameterSources);
    }

    @Override
    public void published(DomainEvent event) {
        if (Objects.isNull(event)) {
            return;
        }
        DomainEventDo domainEventDo = new DomainEventDo();
        domainEventDo.setId(String.valueOf(event.getEventId()));
        domainEventDo.setStatus(DomainEventStatus.PUBLISHED.getStatus());
        domainEventDo.setUpdateTime(LocalDateTime.now());
        this.npJdbcTemplate.update(this.SQL_UPDATE_STATUS, new BeanPropertySqlParameterSource(domainEventDo));
    }

    @Override
    public void deleteBeforeDays(Integer retentionDays) {
        LocalDateTime beforeDay = LocalDateTime.now().minusDays(retentionDays);
        int deleteCount = this.npJdbcTemplate.getJdbcTemplate().update(this.SQL_DELETE_BEFORE_DATETIME, beforeDay, DomainEventStatus.PUBLISHED.getStatus());
        log.debug("Clean domain events, total clean count: {}", deleteCount);
    }
}
